bitkeeper revision 1.1236.35.1 (42371b91vqaZiam66I7Q_46q67kWeg)
authorjrb44@plym.cl.cam.ac.uk <jrb44@plym.cl.cam.ac.uk>
Tue, 15 Mar 2005 17:29:53 +0000 (17:29 +0000)
committerjrb44@plym.cl.cam.ac.uk <jrb44@plym.cl.cam.ac.uk>
Tue, 15 Mar 2005 17:29:53 +0000 (17:29 +0000)
Added blockstored and initial distributed parallax backend.

.rootkeys
BitKeeper/etc/ignore
BitKeeper/etc/logging_ok
tools/blktap/Makefile
tools/blktap/blockstore.c
tools/blktap/blockstore.h
tools/blktap/blockstored.c [new file with mode: 0644]
tools/blktap/bstest.c [new file with mode: 0644]

index ddfd684dfa634afa4dba4d102f112ee1fe6479a8..56d56e9cdef8fe3451b80b2f7592ca77af5e7c50 100644 (file)
--- a/.rootkeys
+++ b/.rootkeys
 42090340C-WkRPT7N3t-8Lzehzogdw tools/blktap/blktaplib.h
 42277b02WrfP1meTDPv1M5swFq8oHQ tools/blktap/blockstore.c
 42277b02P1C0FYj3gqwTZUD8sxKCug tools/blktap/blockstore.h
+42371b8aL1JsxAXOd4bBhmZKDyjiJg tools/blktap/blockstored.c
+42371b8aD_x3L9MKsXciMNqkuk58eQ tools/blktap/bstest.c
 42090340B3mDvcxvd9ehDHUkg46hvw tools/blktap/libgnbd/Makefile
 42090340ZWkc5Xhf9lpQmDON8HJXww tools/blktap/libgnbd/gnbdtest.c
 42090340ocMiUScJE3OpY7QNunvSbg tools/blktap/libgnbd/libgnbd.c
index e7c52f6ba5668e7c4e7fb2aa3023e3b344d82ba7..1d370131c1a0a77121d8ca789ec28cb79e69e986 100644 (file)
@@ -124,3 +124,5 @@ tools/blktap/vdi_validate
 tools/blktap/xen/*
 tools/cmdline/*
 tools/tests/test_x86_emulator
+tools/blktap/blockstored
+tools/blktap/bstest
index d3fabe9b7d9063bfa3f544a11696ab862dad10fb..e5566aa7b049644e04c47e6a04ad37da7c3ef82b 100644 (file)
@@ -33,6 +33,7 @@ iap10@pb001.cl.cam.ac.uk
 iap10@pb007.cl.cam.ac.uk
 iap10@striker.cl.cam.ac.uk
 iap10@tetris.cl.cam.ac.uk
+jrb44@plym.cl.cam.ac.uk
 jws22@gauntlet.cl.cam.ac.uk
 jws@cairnwell.research
 kaf24@camelot.eng.3leafnetworks.com
index 50d77b905b3c3e539475cd288071b9179c6a2a61..9fdbbde792875e393117192b4755cdf0c486ecd8 100644 (file)
@@ -141,6 +141,10 @@ vdi_fill: $(LIB) vdi_fill.c $(VDI_SRCS)
 vdi_validate: $(LIB) vdi_validate.c $(VDI_SRCS)
        $(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c $(VDI_SRCS)
 
+blockstored: blockstored.c
+       $(CC) $(CFLAGS) -g3 -o blockstored blockstored.c
+bstest: bstest.c blockstore.c
+       $(CC) $(CFLAGS) -g3 -o bstest bstest.c blockstore.c
 
 rdx_cmp: $(LIB) rdx_cmp.c $(VDI_SRCS)
        $(CC) $(CFLAGS) -g3 -o rdx_cmp rdx_cmp.c $(VDI_SRCS)
index 179fcdc3c3d29dfa46acb34ed334ab7c4b99dd1f..079c45576bff7d49a34eca853b753f84dd0c85fb 100644 (file)
 #include <sys/stat.h>
 #include "blockstore.h"
 
+#define BLOCKSTORE_REMOTE
+
+#ifdef BLOCKSTORE_REMOTE
+
+//#define BSDEBUG
+
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <netinet/in.h>
+#include <netdb.h>
+
+#define ENTER_QUEUE_CR (void)0
+#define LEAVE_QUEUE_CR (void)0
+
+bsserver_t bsservers[MAX_SERVERS];
+bscluster_t bsclusters[MAX_CLUSTERS];
+
+struct sockaddr_in sin_local;
+int bssock = 0;
+
+typedef struct bsq_t_struct {
+    struct bsq_t_struct *prev;
+    struct bsq_t_struct *next;
+    int server;
+    int length;
+    struct msghdr msghdr;
+    struct iovec iov[2];
+    bshdr_t message;
+    void *block;
+} bsq_t;
+
+bsq_t *bs_head = NULL;
+bsq_t *bs_tail = NULL;
+
+int send_message(bsq_t *qe) {
+    int rc;
+
+    qe->msghdr.msg_name = (void *)&(bsservers[qe->server].sin);
+    qe->msghdr.msg_namelen = sizeof(struct sockaddr_in);
+    qe->msghdr.msg_iov = qe->iov;
+    if (qe->block)
+        qe->msghdr.msg_iovlen = 2;
+    else
+        qe->msghdr.msg_iovlen = 1;
+    qe->msghdr.msg_control = NULL;
+    qe->msghdr.msg_controllen = 0;
+    qe->msghdr.msg_flags = 0;
+
+    qe->iov[0].iov_base = (void *)&(qe->message);
+    qe->iov[0].iov_len = MSGBUFSIZE_ID;
+
+    if (qe->block) {
+        qe->iov[1].iov_base = qe->block;
+        qe->iov[1].iov_len = BLOCK_SIZE;
+    }
+
+    rc = sendmsg(bssock, &(qe->msghdr), 0);
+    //rc = sendto(bssock, (void *)&(qe->message), qe->length, 0,
+    //           (struct sockaddr *)&(bsservers[qe->server].sin),
+    //           sizeof(struct sockaddr_in));
+    if (rc < 0)
+        return rc;
+    
+    ENTER_QUEUE_CR;
+    
+    LEAVE_QUEUE_CR;
+
+    return rc;
+}
+
+int recv_message(bsq_t *qe) {
+    struct sockaddr_in from;
+    //int flen = sizeof(from);
+    int rc;
+
+    qe->msghdr.msg_name = &from;
+    qe->msghdr.msg_namelen = sizeof(struct sockaddr_in);
+    qe->msghdr.msg_iov = qe->iov;
+    if (qe->block)
+        qe->msghdr.msg_iovlen = 2;
+    else
+        qe->msghdr.msg_iovlen = 1;
+    qe->msghdr.msg_control = NULL;
+    qe->msghdr.msg_controllen = 0;
+    qe->msghdr.msg_flags = 0;
+
+    qe->iov[0].iov_base = (void *)&(qe->message);
+    qe->iov[0].iov_len = MSGBUFSIZE_ID;
+    if (qe->block) {
+        qe->iov[1].iov_base = qe->block;
+        qe->iov[1].iov_len = BLOCK_SIZE;
+    }
+
+    rc = recvmsg(bssock, &(qe->msghdr), 0);
+
+    //return recvfrom(bssock, (void *)&(qe->message), sizeof(bsmsg_t), 0,
+    //               (struct sockaddr *)&from, &flen);
+    return rc;
+}
+
+void *readblock_indiv(int server, u64 id) {
+    void *block;
+    bsq_t *qe;
+    int len;
+
+    qe = (bsq_t *)malloc(sizeof(bsq_t));
+    if (!qe) {
+        perror("readblock qe malloc");
+        return NULL;
+    }
+    qe->block = malloc(BLOCK_SIZE);
+    if (!qe->block) {
+        perror("readblock qe malloc");
+        free((void *)qe);
+        return NULL;
+    }
+
+    qe->server = server;
+
+    qe->message.operation = BSOP_READBLOCK;
+    qe->message.flags = 0;
+    qe->message.id = id;
+    qe->length = MSGBUFSIZE_ID;
+
+    if (send_message(qe) < 0) {
+        perror("readblock sendto");
+        goto err;
+    }
+    
+    len = recv_message(qe);
+    if (len < 0) {
+        perror("readblock recv");
+        goto err;
+    }
+    if ((qe->message.flags & BSOP_FLAG_ERROR)) {
+        fprintf(stderr, "readblock server error\n");
+        goto err;
+    }
+    if (len < MSGBUFSIZE_BLOCK) {
+        fprintf(stderr, "readblock recv short (%u)\n", len);
+        goto err;
+    }
+    if ((block = malloc(BLOCK_SIZE)) == NULL) {
+        perror("readblock malloc");
+        goto err;
+    }
+    //memcpy(block, qe->message.block, BLOCK_SIZE);
+    block = qe->block;
+
+    free((void *)qe);
+    return block;
+
+    err:
+    free(qe->block);
+    free((void *)qe);
+    return NULL;
+}
+
+/**
+ * readblock: read a block from disk
+ *   @id: block id to read
+ *
+ *   @return: pointer to block, NULL on error
+ */
+void *readblock(u64 id) {
+    int map = (int)BSID_MAP(id);
+    u64 xid;
+    static int i = CLUSTER_MAX_REPLICAS - 1;
+    void *block = NULL;
+
+    /* special case for the "superblock" just use the first block on the
+     * first replica. (extend to blocks < 6 for vdi bug)
+     */
+    if (id < 6) {
+        block = readblock_indiv(bsclusters[map].servers[0], id);
+        goto out;
+    }
+
+    i++;
+    if (i >= CLUSTER_MAX_REPLICAS)
+        i = 0;
+    switch (i) {
+    case 0:
+        xid = BSID_REPLICA0(id);
+        break;
+    case 1:
+        xid = BSID_REPLICA1(id);
+        break;
+    case 2:
+        xid = BSID_REPLICA2(id);
+        break;
+    }
+    
+    block = readblock_indiv(bsclusters[map].servers[i], xid);
+
+    out:
+#ifdef BSDEBUG
+    if (block)
+        fprintf(stderr, "READ:  %016llx %02x%02x %02x%02x %02x%02x %02x%02x\n",
+                id,
+                (unsigned int)((unsigned char *)block)[0],
+                (unsigned int)((unsigned char *)block)[1],
+                (unsigned int)((unsigned char *)block)[2],
+                (unsigned int)((unsigned char *)block)[3],
+                (unsigned int)((unsigned char *)block)[4],
+                (unsigned int)((unsigned char *)block)[5],
+                (unsigned int)((unsigned char *)block)[6],
+                (unsigned int)((unsigned char *)block)[7]);
+    else
+        fprintf(stderr, "READ:  %016llx NULL\n", id);
+#endif
+    return block;
+}
+
+int writeblock_indiv(int server, u64 id, void *block) {
+    bsq_t *qe;
+    int len;
+
+    qe = (bsq_t *)malloc(sizeof(bsq_t));
+    if (!qe) {
+        perror("writeblock qe malloc");
+        goto err;
+    }
+    qe->server = server;
+
+    qe->message.operation = BSOP_WRITEBLOCK;
+    qe->message.flags = 0;
+    qe->message.id = id;
+    //memcpy(qe->message.block, block, BLOCK_SIZE);
+    qe->block = block;
+    qe->length = MSGBUFSIZE_BLOCK;
+
+    if (send_message(qe) < 0) {
+        perror("writeblock sendto");
+        goto err;
+    }
+    
+    len = recv_message(qe);
+    if (len < 0) {
+        perror("writeblock recv");
+        goto err;
+    }
+    if ((qe->message.flags & BSOP_FLAG_ERROR)) {
+        fprintf(stderr, "writeblock server error\n");
+        goto err;
+    }
+    if (len < MSGBUFSIZE_ID) {
+        fprintf(stderr, "writeblock recv short (%u)\n", len);
+        goto err;
+    }
+
+    free((void *)qe);
+    return 0;
+
+    err:
+    free((void *)qe);
+    return -1;
+}
+
+/**
+ * writeblock: write an existing block to disk
+ *   @id: block id
+ *   @block: pointer to block
+ *
+ *   @return: zero on success, -1 on failure
+ */
+int writeblock(u64 id, void *block) {
+    int map = (int)BSID_MAP(id);
+    
+    int rep0 = bsclusters[map].servers[0];
+    int rep1 = bsclusters[map].servers[1];
+    int rep2 = bsclusters[map].servers[2];
+
+#ifdef BSDEBUG
+    fprintf(stderr,
+            "WRITE: %016llx %02x%02x %02x%02x %02x%02x %02x%02x\n",
+            id,
+            (unsigned int)((unsigned char *)block)[0],
+            (unsigned int)((unsigned char *)block)[1],
+            (unsigned int)((unsigned char *)block)[2],
+            (unsigned int)((unsigned char *)block)[3],
+            (unsigned int)((unsigned char *)block)[4],
+            (unsigned int)((unsigned char *)block)[5],
+            (unsigned int)((unsigned char *)block)[6],
+            (unsigned int)((unsigned char *)block)[7]);
+#endif
+
+/* special case for the "superblock" just use the first block on the
+     * first replica. (extend to blocks < 6 for vdi bug)
+     */
+    if (id < 6) {
+        return writeblock_indiv(rep0, id, block);
+    }
+
+    if (writeblock_indiv(rep0, BSID_REPLICA0(id), block) < 0)
+        return -1;
+    if (writeblock_indiv(rep1, BSID_REPLICA1(id), block) < 0)
+        return -1;
+    if (writeblock_indiv(rep2, BSID_REPLICA2(id), block) < 0)
+        return -1;
+    return 0;
+}
+
+/**
+ * allocblock: write a new block to disk
+ *   @block: pointer to block
+ *
+ *   @return: new id of block on disk
+ */
+u64 allocblock(void *block) {
+    return allocblock_hint(block, 0);
+}
+
+u64 allocblock_hint_indiv(int server, void *block, u64 hint) {
+    bsq_t *qe;
+    int len;
+
+    qe = (bsq_t *)malloc(sizeof(bsq_t));
+    if (!qe) {
+        perror("allocblock_hint qe malloc");
+        goto err;
+    }
+    qe->server = server;
+
+    qe->message.operation = BSOP_ALLOCBLOCK;
+    qe->message.flags = 0;
+    qe->message.id = hint;
+    //memcpy(qe->message.block, block, BLOCK_SIZE);
+    qe->block = block;
+    qe->length = MSGBUFSIZE_BLOCK;
+
+    if (send_message(qe) < 0) {
+        perror("allocblock_hint sendto");
+        goto err;
+    }
+    
+    len = recv_message(qe);
+    if (len < 0) {
+        perror("allocblock_hint recv");
+        goto err;
+    }
+    if ((qe->message.flags & BSOP_FLAG_ERROR)) {
+        fprintf(stderr, "allocblock_hint server error\n");
+        goto err;
+    }
+    if (len < MSGBUFSIZE_ID) {
+        fprintf(stderr, "allocblock_hint recv short (%u)\n", len);
+        goto err;
+    }
+
+    free((void *)qe);
+    return qe->message.id;
+
+    err:
+    free((void *)qe);
+    return 0;
+}
+
+/**
+ * allocblock_hint: write a new block to disk
+ *   @block: pointer to block
+ *   @hint: allocation hint
+ *
+ *   @return: new id of block on disk
+ */
+u64 allocblock_hint(void *block, u64 hint) {
+    int map = (int)hint;
+    
+    int rep0 = bsclusters[map].servers[0];
+    int rep1 = bsclusters[map].servers[1];
+    int rep2 = bsclusters[map].servers[2];
+
+    u64 id0, id1, id2;
+
+    id0 = allocblock_hint_indiv(rep0, block, 0);
+    if (id0 == 0)
+        return 0;
+    id1 = allocblock_hint_indiv(rep1, block, 0);
+    if (id1 == 0)
+        return 0;
+    id2 = allocblock_hint_indiv(rep2, block, 0);
+    if (id2 == 0)
+        return 0;
+
+#ifdef BSDEBUG
+    fprintf(stderr, "ALLOC: %016llx %02x%02x %02x%02x %02x%02x %02x%02x\n",
+            BSID(map, id0, id1, id2),
+            (unsigned int)((unsigned char *)block)[0],
+            (unsigned int)((unsigned char *)block)[1],
+            (unsigned int)((unsigned char *)block)[2],
+            (unsigned int)((unsigned char *)block)[3],
+            (unsigned int)((unsigned char *)block)[4],
+            (unsigned int)((unsigned char *)block)[5],
+            (unsigned int)((unsigned char *)block)[6],
+            (unsigned int)((unsigned char *)block)[7]);
+#endif
+
+    return BSID(map, id0, id1, id2);
+}
+
+#else /* /BLOCKSTORE_REMOTE */
+
 static int block_fp = -1;
  
 /**
@@ -94,6 +496,18 @@ u64 allocblock(void *block) {
     return lb;
 }
 
+/**
+ * allocblock_hint: write a new block to disk
+ *   @block: pointer to block
+ *   @hint: allocation hint
+ *
+ *   @return: new id of block on disk
+ */
+u64 allocblock_hint(void *block, u64 hint) {
+    return allocblock(block);
+}
+
+#endif /* BLOCKSTORE_REMOTE */
 
 /**
  * newblock: get a new in-memory block set to zeros
@@ -124,12 +538,92 @@ void freeblock(void *block) {
 
 int __init_blockstore(void)
 {
+#ifdef BLOCKSTORE_REMOTE
+    struct hostent *addr;
+    int i;
+
+    bsservers[0].hostname = "firebug.cl.cam.ac.uk";
+    bsservers[1].hostname = "tetris.cl.cam.ac.uk";
+    bsservers[2].hostname = "donkeykong.cl.cam.ac.uk";
+    bsservers[3].hostname = "gunfighter.cl.cam.ac.uk";
+    bsservers[4].hostname = "galaxian.cl.cam.ac.uk";
+    bsservers[5].hostname = "firetrack.cl.cam.ac.uk";
+    bsservers[6].hostname = "funfair.cl.cam.ac.uk";
+    bsservers[7].hostname = "felix.cl.cam.ac.uk";
+    bsservers[8].hostname = NULL;
+    bsservers[9].hostname = NULL;
+    bsservers[10].hostname = NULL;
+    bsservers[11].hostname = NULL;
+    bsservers[12].hostname = NULL;
+    bsservers[13].hostname = NULL;
+    bsservers[14].hostname = NULL;
+    bsservers[15].hostname = NULL;
+
+    for (i = 0; i < MAX_SERVERS; i++) {
+        if (!bsservers[i].hostname)
+            continue;
+        addr = gethostbyname(bsservers[i].hostname);
+        if (!addr) {
+            perror("bad hostname");
+            return -1;
+        }
+        bsservers[i].sin.sin_family = addr->h_addrtype;
+        bsservers[i].sin.sin_port = htons(BLOCKSTORED_PORT);
+        bsservers[i].sin.sin_addr.s_addr = 
+            ((struct in_addr *)(addr->h_addr))->s_addr;
+    }
+
+    /* Cluster map
+     */
+    bsclusters[0].servers[0] = 0;
+    bsclusters[0].servers[1] = 1;
+    bsclusters[0].servers[2] = 2;
+    bsclusters[1].servers[0] = 1;
+    bsclusters[1].servers[1] = 2;
+    bsclusters[1].servers[2] = 3;
+    bsclusters[2].servers[0] = 2;
+    bsclusters[2].servers[1] = 3;
+    bsclusters[2].servers[2] = 4;
+    bsclusters[3].servers[0] = 3;
+    bsclusters[3].servers[1] = 4;
+    bsclusters[3].servers[2] = 5;
+    bsclusters[4].servers[0] = 4;
+    bsclusters[4].servers[1] = 5;
+    bsclusters[4].servers[2] = 6;
+    bsclusters[5].servers[0] = 5;
+    bsclusters[5].servers[1] = 6;
+    bsclusters[5].servers[2] = 7;
+    bsclusters[6].servers[0] = 6;
+    bsclusters[6].servers[1] = 7;
+    bsclusters[6].servers[2] = 0;
+    bsclusters[7].servers[0] = 7;
+    bsclusters[7].servers[1] = 0;
+    bsclusters[7].servers[2] = 1;
+
+    /* Local socket set up
+     */
+    bssock = socket(AF_INET, SOCK_DGRAM, 0);
+    if (bssock < 0) {
+        perror("Bad socket");
+        return -1;
+    }
+    memset(&sin_local, 0, sizeof(sin_local));
+    sin_local.sin_family = AF_INET;
+    sin_local.sin_port = htons(BLOCKSTORED_PORT);
+    sin_local.sin_addr.s_addr = htonl(INADDR_ANY);
+    if (bind(bssock, (struct sockaddr *)&sin_local, sizeof(sin_local)) < 0) {
+        perror("bind");
+        close(bssock);
+        return -1;
+    }
+
+#else /* /BLOCKSTORE_REMOTE */
     block_fp = open("blockstore.dat", O_RDWR | O_CREAT | O_LARGEFILE, 0644);
 
     if (block_fp < 0) {
         perror("open");
         return -1;
     }
-    
+#endif /*  BLOCKSTORE_REMOTE */   
     return 0;
 }
index 0e531c5ab420c17ef3ae477ccc037c8b7085422b..1f805daafe59a74cf079fc028724b9ccbdbf9162 100644 (file)
@@ -9,6 +9,7 @@
 #ifndef __BLOCKSTORE_H__
 #define __BLOCKSTORE_H__
 
+#include <netinet/in.h>
 #include <xc.h>
 
 #define BLOCK_SIZE  4096
 extern void *newblock();
 extern void *readblock(u64 id);
 extern u64 allocblock(void *block);
+extern u64 allocblock_hint(void *block, u64 hint);
 extern int writeblock(u64 id, void *block);
 extern void freeblock(void *block);
 extern int __init_blockstore(void);
 
+#define ALLOCFAIL (((u64)(-1)))
+
+/* Distribution
+ */
+#define BLOCKSTORED_PORT 9346
+
+struct bshdr_t_struct {
+    u32            operation;
+    u32            flags;
+    u64            id;
+} __attribute__ ((packed));
+typedef struct bshdr_t_struct bshdr_t;
+
+struct bsmsg_t_struct {
+    bshdr_t        hdr;
+    unsigned char  block[BLOCK_SIZE];
+} __attribute__ ((packed));
+
+typedef struct bsmsg_t_struct bsmsg_t;
+
+#define MSGBUFSIZE_OP    sizeof(u32)
+#define MSGBUFSIZE_FLAGS (sizeof(u32) + sizeof(u32))
+#define MSGBUFSIZE_ID    (sizeof(u32) + sizeof(u32) + sizeof(u64))
+#define MSGBUFSIZE_BLOCK sizeof(bsmsg_t)
+
+#define BSOP_READBLOCK  0x01
+#define BSOP_WRITEBLOCK 0x02
+#define BSOP_ALLOCBLOCK 0x03
+
+#define BSOP_FLAG_ERROR 0x01
+
+#define BS_ALLOC_SKIP 10
+#define BS_ALLOC_HACK
+
+/* Remote hosts and cluster map - XXX need to generalise
+ */
+
+/*
+
+  Interim ID format is
+
+  63 60 59                40 39                20 19                 0
+  +----+--------------------+--------------------+--------------------+
+  |map | replica 2          | replica 1          | replica 0          |
+  +----+--------------------+--------------------+--------------------+
+
+  The map is an index into a table detailing which machines form the
+  cluster.
+
+ */
+
+#define BSID_REPLICA0(_id) ((_id)&0xfffffULL)
+#define BSID_REPLICA1(_id) (((_id)>>20)&0xfffffULL)
+#define BSID_REPLICA2(_id) (((_id)>>40)&0xfffffULL)
+#define BSID_MAP(_id)      (((_id)>>60)&0xfULL)
+
+#define BSID(_map, _rep0, _rep1, _rep2) ((((u64)(_map))<<60) | \
+                                         (((u64)(_rep2))<<40) | \
+                                         (((u64)(_rep1))<<20) | ((u64)(_rep0)))
+
+typedef struct bsserver_t_struct {
+    char              *hostname;
+    struct sockaddr_in sin;
+} bsserver_t;
+
+#define MAX_SERVERS 16
+
+#define CLUSTER_MAX_REPLICAS 3
+typedef struct bscluster_t_struct {
+    int servers[CLUSTER_MAX_REPLICAS];
+} bscluster_t;
+
+#define MAX_CLUSTERS 16
+
 #endif /* __BLOCKSTORE_H__ */
diff --git a/tools/blktap/blockstored.c b/tools/blktap/blockstored.c
new file mode 100644 (file)
index 0000000..6b86cb1
--- /dev/null
@@ -0,0 +1,276 @@
+/**************************************************************************
+ * 
+ * blockstored.c
+ *
+ * Block store daemon.
+ *
+ */
+
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include "blockstore.h"
+
+//#define BSDEBUG
+
+int readblock_into(u64 id, void *block);
+
+int open_socket(u16 port) {
+    
+    struct sockaddr_in sn;
+    int sock;
+
+    sock = socket(AF_INET, SOCK_DGRAM, 0);
+    if (sock < 0) {
+        perror("Bad socket");
+        return -1;
+    }
+    memset(&sn, 0, sizeof(sn));
+    sn.sin_family = AF_INET;
+    sn.sin_port = htons(port);
+    sn.sin_addr.s_addr = htonl(INADDR_ANY);
+    if (bind(sock, (struct sockaddr *)&sn, sizeof(sn)) < 0) {
+        perror("bind");
+        close(sock);
+        return -1;
+    }
+
+    return sock;
+}
+
+static int block_fp = -1;
+static int bssock = -1;
+
+int send_reply(struct sockaddr_in *peer, void *buffer, int len) {
+
+    int rc;
+    
+#ifdef BSDEBUG
+    fprintf(stdout, "TX: %u bytes op=%u id=0x%llx\n",
+            len, ((bsmsg_t *)buffer)->hdr.operation, ((bsmsg_t *)buffer)->hdr.id);
+#endif
+    rc = sendto(bssock, buffer, len, 0, (struct sockaddr *)peer, sizeof(*peer));
+    if (rc < 0) {
+        perror("send_reply");
+        return 1;
+    }
+
+
+    return 0;
+}
+
+static bsmsg_t msgbuf;
+
+void service_loop(void) {
+
+    for (;;) {
+        int rc, len;
+        struct sockaddr_in from;
+        size_t slen = sizeof(from);
+        u64 bid;
+
+        len = recvfrom(bssock, (void *)&msgbuf, sizeof(msgbuf), 0,
+                       (struct sockaddr *)&from, &slen);
+
+        if (len < 0) {
+            perror("recvfrom");
+            continue;
+        }
+
+        if (len < MSGBUFSIZE_OP) {
+            fprintf(stderr, "Short packet.\n");
+            continue;
+        }
+
+#ifdef BSDEBUG
+        fprintf(stdout, "RX: %u bytes op=%u id=0x%llx\n",
+                len, msgbuf.hdr.operation, msgbuf.hdr.id);
+#endif
+
+        switch (msgbuf.hdr.operation) {
+        case BSOP_READBLOCK:
+            if (len < MSGBUFSIZE_ID) {
+                fprintf(stderr, "Short packet (readblock %u).\n", len);
+                continue;
+            }
+            rc = readblock_into(msgbuf.hdr.id, msgbuf.block);
+            if (rc < 0) {
+                fprintf(stderr, "readblock error\n");
+                msgbuf.hdr.flags = BSOP_FLAG_ERROR;
+                send_reply(&from, (void *)&msgbuf, MSGBUFSIZE_ID);
+                continue;
+            }
+            msgbuf.hdr.flags = 0;
+            send_reply(&from, (void *)&msgbuf, MSGBUFSIZE_BLOCK);
+            break;
+        case BSOP_WRITEBLOCK:
+            if (len < MSGBUFSIZE_BLOCK) {
+                fprintf(stderr, "Short packet (writeblock %u).\n", len);
+                continue;
+            }
+            rc = writeblock(msgbuf.hdr.id, msgbuf.block);
+            if (rc < 0) {
+                fprintf(stderr, "writeblock error\n");
+                msgbuf.hdr.flags = BSOP_FLAG_ERROR;
+                send_reply(&from, (void *)&msgbuf, MSGBUFSIZE_ID);
+                continue;
+            }
+            msgbuf.hdr.flags = 0;
+            send_reply(&from, (void *)&msgbuf, MSGBUFSIZE_ID);
+            break;
+        case BSOP_ALLOCBLOCK:
+            if (len < MSGBUFSIZE_BLOCK) {
+                fprintf(stderr, "Short packet (allocblock %u).\n", len);
+                continue;
+            }
+            bid = allocblock(msgbuf.block);
+            if (bid == ALLOCFAIL) {
+                fprintf(stderr, "allocblock error\n");
+                msgbuf.hdr.flags = BSOP_FLAG_ERROR;
+                send_reply(&from, (void *)&msgbuf, MSGBUFSIZE_ID);
+                continue;
+            }
+            msgbuf.hdr.id = bid;
+            msgbuf.hdr.flags = 0;
+            send_reply(&from, (void *)&msgbuf, MSGBUFSIZE_ID);
+            break;
+        }
+
+    }
+}
+/**
+ * readblock: read a block from disk
+ *   @id: block id to read
+ *   @block: pointer to buffer to receive block
+ *
+ *   @return: 0 if OK, other on error
+ */
+
+int readblock_into(u64 id, void *block) {
+    if (lseek64(block_fp, ((off64_t) id - 1LL) * BLOCK_SIZE, SEEK_SET) < 0) {
+        printf ("%Ld\n", (id - 1) * BLOCK_SIZE);
+        perror("readblock lseek");
+        return -1;
+    }
+    if (read(block_fp, block, BLOCK_SIZE) != BLOCK_SIZE) {
+        perror("readblock read");
+        return -1;
+    }
+    return 0;
+}
+
+/**
+ * writeblock: write an existing block to disk
+ *   @id: block id
+ *   @block: pointer to block
+ *
+ *   @return: zero on success, -1 on failure
+ */
+int writeblock(u64 id, void *block) {
+    if (lseek64(block_fp, ((off64_t) id - 1LL) * BLOCK_SIZE, SEEK_SET) < 0) {
+        perror("writeblock lseek");
+        return -1;
+    }
+    if (write(block_fp, block, BLOCK_SIZE) < 0) {
+        perror("writeblock write");
+        return -1;
+    }
+    return 0;
+}
+
+/**
+ * allocblock: write a new block to disk
+ *   @block: pointer to block
+ *
+ *   @return: new id of block on disk
+ */
+static u64 lastblock = 0;
+
+u64 allocblock(void *block) {
+    u64 lb;
+    off64_t pos;
+
+    retry:
+    pos = lseek64(block_fp, 0, SEEK_END);
+    if (pos == (off64_t)-1) {
+        perror("allocblock lseek");
+        return ALLOCFAIL;
+    }
+    if (pos % BLOCK_SIZE != 0) {
+        fprintf(stderr, "file size not multiple of %d\n", BLOCK_SIZE);
+        return ALLOCFAIL;
+    }
+    if (write(block_fp, block, BLOCK_SIZE) != BLOCK_SIZE) {
+        perror("allocblock write");
+        return ALLOCFAIL;
+    }
+    lb = pos / BLOCK_SIZE + 1;
+
+#ifdef BS_ALLOC_HACK
+    if (lb < BS_ALLOC_SKIP)
+        goto retry;
+#endif
+    
+    if (lb <= lastblock)
+        printf("[*** %Ld alredy allocated! ***]\n", lb);
+    
+    lastblock = lb;
+    return lb;
+}
+
+/**
+ * newblock: get a new in-memory block set to zeros
+ *
+ *   @return: pointer to new block, NULL on error
+ */
+void *newblock() {
+    void *block = malloc(BLOCK_SIZE);
+    if (block == NULL) {
+        perror("newblock");
+        return NULL;
+    }
+    memset(block, 0, BLOCK_SIZE);
+    return block;
+}
+
+
+/**
+ * freeblock: unallocate an in-memory block
+ *   @id: block id (zero if this is only in-memory)
+ *   @block: block to be freed
+ */
+void freeblock(void *block) {
+    if (block != NULL)
+        free(block);
+}
+
+
+int main(int argc, char **argv)
+{
+    block_fp = open("blockstore.dat", O_RDWR | O_CREAT | O_LARGEFILE, 0644);
+
+    if (block_fp < 0) {
+        perror("open");
+        return -1;
+    }
+
+    bssock = open_socket(BLOCKSTORED_PORT);
+    if (bssock < 0) {
+        return -1;
+    }
+
+    service_loop();
+    
+    close(bssock);
+
+    return 0;
+}
diff --git a/tools/blktap/bstest.c b/tools/blktap/bstest.c
new file mode 100644 (file)
index 0000000..5476ea1
--- /dev/null
@@ -0,0 +1,191 @@
+/**************************************************************************
+ * 
+ * bstest.c
+ *
+ * Block store daemon test program.
+ *
+ * usage: bstest <host>|X {r|w|a} ID 
+ *
+ */
+
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <errno.h>
+#include "blockstore.h"
+
+int direct(char *host, u32 op, u64 id, int len) {
+    struct sockaddr_in sn, peer;
+    int sock;
+    bsmsg_t msgbuf;
+    int rc, slen;
+    struct hostent *addr;
+
+    addr = gethostbyname(host);
+    if (!addr) {
+        perror("bad hostname");
+        exit(1);
+    }
+    peer.sin_family = addr->h_addrtype;
+    peer.sin_port = htons(BLOCKSTORED_PORT);
+    peer.sin_addr.s_addr =  ((struct in_addr *)(addr->h_addr))->s_addr;
+    fprintf(stderr, "Sending to: %u.%u.%u.%u\n",
+            (unsigned int)(unsigned char)addr->h_addr[0],
+            (unsigned int)(unsigned char)addr->h_addr[1],
+            (unsigned int)(unsigned char)addr->h_addr[2],
+            (unsigned int)(unsigned char)addr->h_addr[3]);
+
+    sock = socket(AF_INET, SOCK_DGRAM, 0);
+    if (sock < 0) {
+        perror("Bad socket");
+        exit(1);
+    }
+    memset(&sn, 0, sizeof(sn));
+    sn.sin_family = AF_INET;
+    sn.sin_port = htons(BLOCKSTORED_PORT);
+    sn.sin_addr.s_addr = htonl(INADDR_ANY);
+    if (bind(sock, (struct sockaddr *)&sn, sizeof(sn)) < 0) {
+        perror("bind");
+        close(sock);
+        exit(1);
+    }
+
+    memset((void *)&msgbuf, 0, sizeof(msgbuf));
+    msgbuf.operation = op;
+    msgbuf.id = id;
+
+    rc = sendto(sock, (void *)&msgbuf, len, 0,
+                (struct sockaddr *)&peer, sizeof(peer));
+    if (rc < 0) {
+        perror("sendto");
+        exit(1);
+    }
+
+    slen = sizeof(peer);
+    len = recvfrom(sock, (void *)&msgbuf, sizeof(msgbuf), 0,
+                   (struct sockaddr *)&peer, &slen);
+    if (len < 0) {
+        perror("recvfrom");
+        exit(1);
+    }
+
+    printf("Reply %u bytes:\n", len);
+    if (len >= MSGBUFSIZE_OP)
+        printf("  operation: %u\n", msgbuf.operation);
+    if (len >= MSGBUFSIZE_FLAGS)
+        printf("  flags: 0x%x\n", msgbuf.flags);
+    if (len >= MSGBUFSIZE_ID)
+        printf("  id: %llu\n", msgbuf.id);
+    if (len >= (MSGBUFSIZE_ID + 4))
+        printf("  data: %02x %02x %02x %02x...\n",
+               (unsigned int)msgbuf.block[0],
+               (unsigned int)msgbuf.block[1],
+               (unsigned int)msgbuf.block[2],
+               (unsigned int)msgbuf.block[3]);
+    
+    if (sock > 0)
+        close(sock);
+   
+    return 0;
+}
+
+int main (int argc, char **argv) {
+
+    u32 op = 0;
+    u64 id = 0;
+    int len = 0, rc;
+    void *block;
+
+    if (argc < 3) {
+        fprintf(stderr, "usage: bstest <host>|X {r|w|a} ID\n");
+        return 1;
+    }
+
+    switch (argv[2][0]) {
+    case 'r':
+    case 'R':
+        op = BSOP_READBLOCK;
+        len = MSGBUFSIZE_ID;
+        break;
+    case 'w':
+    case 'W':
+        op = BSOP_WRITEBLOCK;
+        len = MSGBUFSIZE_BLOCK;
+        break;
+    case 'a':
+    case 'A':
+        op = BSOP_ALLOCBLOCK;
+        len = MSGBUFSIZE_BLOCK;
+        break;
+    default:
+        fprintf(stderr, "Unknown action '%s'.\n", argv[2]);
+        return 1;
+    }
+
+    if (argc >= 4)
+        id = atoll(argv[3]);
+
+    if (strcmp(argv[1], "X") == 0) {
+        rc = __init_blockstore();
+        if (rc < 0) {
+            fprintf(stderr, "blockstore init failed.\n");
+            return 1;
+        }
+        switch(op) {
+        case BSOP_READBLOCK:
+            block = readblock(id);
+            if (block) {
+                printf("data: %02x %02x %02x %02x...\n",
+                       (unsigned int)((unsigned char*)block)[0],
+                       (unsigned int)((unsigned char*)block)[1],
+                       (unsigned int)((unsigned char*)block)[2],
+                       (unsigned int)((unsigned char*)block)[3]);
+            }
+            break;
+        case BSOP_WRITEBLOCK:
+            block = malloc(BLOCK_SIZE);
+            if (!block) {
+                perror("bstest malloc");
+                return 1;
+            }
+            memset(block, 0, BLOCK_SIZE);
+            rc = writeblock(id, block);
+            if (rc != 0) {
+                printf("error\n");
+            }
+            else {
+                printf("OK\n");
+            }
+            break;
+        case BSOP_ALLOCBLOCK:
+            block = malloc(BLOCK_SIZE);
+            if (!block) {
+                perror("bstest malloc");
+                return 1;
+            }
+            memset(block, 0, BLOCK_SIZE);
+            id = allocblock_hint(block, id);
+            if (id == 0) {
+                printf("error\n");
+            }
+            else {
+                printf("ID: %llu\n", id);
+            }
+            break;
+        }
+    }
+    else {
+        direct(argv[1], op, id, len);
+    }
+
+
+    return 0;
+}